Airflow 動手玩:(七)Airflow Best Practices


這篇會聊聊 Airflow Best Practices,雖然說是 Best Practices,但是工程不像科學,工程只有適不適合,所以大家並不一定要完全採納,只要適合的就是好的。

Tips

使用 Default Arguments

大多數人第一次寫 DAG,最早碰到的應該就是 Default Arguments,使用 Default Arguments 可以大大減少不斷地在 Task 重複同樣的設定參數。

default_args = {
    'owner': 'someone',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 24),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'end_date': datetime(2020, 2, 29),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
}

使用 params

雖然在實作中沒講到,但是透過 params 可以傳遞自定義的參數進去 Task,而這個參數如果在很多 Task 都重複時,放在 params 就很方便,因為不用每個 Task 都重複一遍。在 Task 中,我們可以從 context.params 拿到這些參數。

dag = DAG(
    ...,
    params={
        "s3_bucket": "test"
    }
}

使用 Variables,但有限制的使用

一樣在實作中沒講到,但是我們可以在 WebServer 的 Admin → Variables 頁面看到,這裡可以使用 key value 形式創造 Variable。標題寫有限制的使用是因為 value 也可以是 json 的格式,所以我們可以用一個 Variable 就設定好一個 DAG 所需的全部參數。

Variable 頁面

然後在 DAG 裡面,我們可以透過 Variable.get("some key") 拿到 Variable,習慣上還會把 Variable 當成 params,因為像上面的 s3_bucket 可能就會隨環境而變。

dag = DAG(
    ...,
    params=Variable.get("test", deserialize_json=True)
)

使用 Connections 儲存敏感資料

如果從第一篇看過來,應該會覺得用 Connections 儲存連線資訊很正常,但如果還不了解 Airflow 的功能,可能會想要透過環境參數傳連線資訊,所以這裡還是提醒一下用 Connections 儲存敏感資料,除了可以用 Connections 連線之外,如果想在 Task 中拿到敏感資訊也很容易。

from airflow.hooks.base_hook import BaseHook
redis_password = BaseHook.get_connection('redis_default').password

使用 context manager

上面幾點都是有關設定的,接下來看看有關 coding 的。每個 Task 我們都要設定 dag=dag,有時 Task 一多就可能忘記,所以 Airflow 也提供了一個方便的功能 context manager,讓在 context manager 下的 Task 都屬於同一個 DAG。

default_args = {...}
with DAG(dag_id='test', default_args=args, schedule_interval='0/1 0 * * *') as dag:
    task_1 = BashOperator(
        task_id='task_1',
    bash_command='echo "say hi"')

    task_2 = DummyOperator(task_id='task_2')

    task_1 >> task_2

同一個 level 的 Task 使用 List 表示

使用 List 表示,可以簡化程式碼。

get_timestamp >> branching >> [store_in_redis, skip]

使用 List 排列 DAG

總結

感謝大家看到這邊,總算完成這次的寫作松了,可以開瓶慶祝了🍾🍾🍾。不過最重要的還是希望在這七篇中可以更了解如何使用 Airflow!謝謝大家!

資料來源

#Airflow #Data Pipeline #Data Cleaning #ETL







你可能感興趣的文章

【筆記】Git 入門筆記——使用 GitHub

【筆記】Git 入門筆記——使用 GitHub

The introduction and difference between class component and function component in React

The introduction and difference between class component and function component in React

BEM

BEM






留言討論